CCP's Implementation Based on Linux Kernel

CCP’s Implementation Based on Linux Kernel

CCP 简介

The congestion control plane (CCP) is a new platform for writing and sharing datapath-agnostic congestion control algorithms. It makes it easy to to program sophisticated algorithms (write Rust or Python in a safe user-space environment as opposed to writing C and a risk of crashing your kernel), and allows the same algorithm implementation to be run on a variety of datapaths (Linux Kernel, DPDK or QUIC).

CCP 整体工作流程梳理

  1. 新的拥塞控制算法或者拥塞控制架构都是以注册函数的形式注册进内核,Linux 内核提供了注册拥塞控制算法的接口;

    1
    2
    3
    4
      tcp_register_congestion_control(&tcp_ccp_congestion_ops);//实现注册
    struct tcp_congestion_ops tcp_ccp_congestion_ops = {//注册内容
    ...
    };
  2. 拥塞控制算法的实现依赖于对网络中相关特性的测量,例如 RTT, Bandwidth, packet loss等元素。如何获取这些元素?以下3个结构体,它们均由内核提供,提供了基本的已经测量完成的测量元素,可以直接从结构体中读出来。

    1
    2
    3
    struct sock;//include/net/sock.h
    struct tcp_sock
    struct rate_sample;// include/net/tcp.h

    以上3个由内核提供的结构体可以提供基本测量元素,例如 bytes_acked, interval_us, rtt_us, losses等。但是,很多时候拥塞控制算法的实现并不单单是使用基本测量元素,而是在基本元素的基础上进行加工而成。举个例子,接受速率或者发送速率需要通过其他基本元素计算出来。CCP针对不同的拥塞控制算法,通过调研,总结出了15种需要测量的元素,又称 primitives,这些元素都是可以通过对socket中的相关数据做简单运算得出来。下列函数实现了该功能。

    1
    int load_primitives(struct sock *sk, const struct rate_sample *rs);//ccp-kernel/tcp_ccp.c

    测量结果放在指定的寄存器中。

  3. 数据平面和控制平面的通信。控制平面位于用户空间,数据平面位于内核空间;数据平面提供控制平面表征网络状况的的测量元素,控制平面根据某种拥塞控制算法来修改数据平面的 cwnd, pacing rate等特性。控制平面和数据平面通过 netlink进行通信。
    3.1 数据平面会计算出所有的可能用到的测量元素,但是传输给控制平面的元素仅是由控制面的拥塞控制算法指定的元素。可以这样简单进行理解,但是实际的实现过程略复杂。
    注:做如下定义:
    一级测量元:直接从内核中读取到的测量元素;
    二级测量元:对一级测量元进行简单加工得到的15种primitives;
    三级测量元:对二级测量元进行运算而成,具体的运算过程由相应的运算控制算法决定。
    可以说,控制平面在一定程度上定义了元素的测量方法。

    3.2 控制平面在算法初始时会将算法需要的测量元素(测量元素的计算方法)告知数据平面,之后会根据数据面传来的测量元素进行决策。

  4. 控制平面的决策结果如何在决策面生效?例如,控制平面计算出了cwnd的值,那么如何修改发送端的cwnd值呢?首先,控制面将结果发送给数据面,数据面修改内核提供的结构体来达到修改cwnd的值。关键函数如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
      static void do_set_cwnd(
    struct ccp_datapath *dp,
    struct ccp_connection *conn,
    uint32_t cwnd
    ) {
    struct sock *sk;
    struct tcp_sock *tp;
    get_sock_from_ccp(&sk, conn);
    tp = tcp_sk(sk);

    // translate cwnd value back into packets
    cwnd /= tp->mss_cache;
    tp->snd_cwnd = cwnd;
    }

    分析上述代码可知,控制平面在sock 结构体上修改,数据平面替换掉原来的sock来达到修改cwnd的效果。


下文是相关函数的具体分析

数据平面(Linux Kernel)代码分析

参考文献,后面有时间再看。
文件定位: tcp_CCP.c

  1. 通过module_init将当前模块加载进内核;

    1
    module_init(tcp_ccp_register);
  2. 从tcp_ccp_register函数开启我们的旅程,关键代码: ccp_datapath 和 ccp 之间的关系

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
        struct ccp_datapath dp = {
    .set_cwnd = &do_set_cwnd,
    .set_rate_abs = &do_set_rate_abs,
    .set_rate_rel = &do_set_rate_rel,
    .now = &ccp_now,
    .since_usecs = &ccp_since,
    .after_usecs = &ccp_after
    };
    //Attached:
    struct ccp {
    // control
    u32 last_snd_una; // 4 B
    u32 last_bytes_acked; // 8 B
    u32 last_sacked_out; // 12 B
    struct skb_info *skb_array; // array of future skb information

    // communication
    struct ccp_connection *dp;
    };

这里只以netlink通信方式进行代码分析:

1
2
3
4
5
6
7
8
9
#if __IPC__ == IPC_NETLINK
ok = ccp_nl_sk(&ccp_read_msg);//creat a netlink,
//指定收到消息时的处理函数, 生成立 netlink的socket nl_sk, 一个全局变量, 位于 ccp_nl.cpp文件,
//struct sock *nl_sk;
if (ok < 0) {
return -1;
}

dp.send_msg = &nl_sendmsg;//Send serialized message to userspace CCP 指定从kernel发往userspace的发送函数,在ccp_nl.c函数中可以看到具体的发送流程

初始化内核内部ccp的框架

1
2
3
4
ok = ccp_init(&dp);//Initialize gloal state and allocate a map for ccp connections upon module load.
if (ok < 0) {
return -1;
}

调用内核接口注册新的拥塞控制算法

1
return tcp_register_congestion_control(&tcp_ccp_congestion_ops);

  1. ccp_init 函数: ccp_active_connections、 datapath、 datapath_programs

    1
    2
    3
    4
    5
    6
    7
    8
    datapath->set_cwnd           = dp->set_cwnd;
    datapath->set_rate_abs = dp->set_rate_abs;
    datapath->set_rate_rel = dp->set_rate_rel;
    datapath->send_msg = dp->send_msg;
    datapath->now = dp->now;
    datapath->since_usecs = dp->since_usecs;
    datapath->after_usecs = dp->after_usecs;
    datapath->impl = dp->impl;
  2. 分析tcp_ccp_congestion_ops的结构:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    struct tcp_congestion_ops tcp_ccp_congestion_ops = {
    .flags = TCP_CONG_NEEDS_ECN,
    .in_ack_event = tcp_ccp_in_ack_event,
    .name = "ccp",
    .owner = THIS_MODULE,
    .init = tcp_ccp_init,
    .release = tcp_ccp_release,
    .ssthresh = tcp_ccp_ssthresh,
    //.cong_avoid = tcp_ccp_cong_avoid,
    .cong_control = tcp_ccp_cong_control,
    .undo_cwnd = tcp_ccp_undo_cwnd,
    .set_state = tcp_ccp_set_state,
    .pkts_acked = tcp_ccp_pkts_acked
    };

    4.1 进入关键函数’tcp_ccp_cong_control‘进行分析: inet_csk_ca 函数的作用?;
    该函数调用函数 ccp_invoke, 在这之前先了解一下ccp_priv_state,ccp_connection结构体:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    // libccp Private State  
    // struct ccp_connection has a void* state to store libccp's state
    // libccp internally casts this to a struct ccp_priv_state*
    //
    struct ccp_priv_state {
    bool sent_create;
    u64 implicit_time_zero; // can be reset

    u16 program_index; // index into program array
    int staged_program_index;//什么意思?

    struct register_file registers;
    struct staged_update pending_update;
    };
    /*
    * CCP state per connection.
    * impl is datapath-specific, the rest are internal to libccp
    * for example, the linux kernel datapath uses impl to store a pointer to struct sock
    */
    struct ccp_connection {
    // the index of this array element
    u16 index;

    u32 last_create_msg_sent;

    // struct ccp_primitives is large; as a result, we store it inside ccp_connection to avoid
    // potential limitations in the datapath
    // datapath should update this before calling ccp_invoke()
    struct ccp_primitives prims;

    // constant flow-level information
    struct ccp_datapath_info flow_info;

    // private libccp state for the send machine and measurement machine
    void *state;

    // datapath-specific per-connection state
    void *impl;
    };

4.2 ccp_invoke 函数位于ccp.c文件,分析ccp_invoke函数的执行流程:

1
2
3
4
5
//Should be called along with the ACK clock.
//will invoke the send and measurement machines.
state = get_ccp_priv_state(conn);//获取connection的state
ok = send_conn_create(datapath, conn);//ccp.c, send create msg, 发送的消息内容,见下文
//至此datapath与userspace建立了连接

4.3 如果已经建立连接,从connnection中取出cwnd, snd_rate的值,放入相关的寄存器。检测相关相关寄存器的状态,impl_is_pending,若为真,写回相应的值.

1
2
3
4
5
6
7
 if (state->pending_update.impl_is_pending[CWND_REG]) {
DBG_PRINT("[sid=%d] Applying staged field update: cwnd reg <- %llu\n", conn->index, state->pending_update.impl_registers[CWND_REG]);
state->registers.impl_registers[CWND_REG] = state->pending_update.impl_registers[CWND_REG];
if (state->registers.impl_registers[CWND_REG] != 0) {
datapath->set_cwnd(datapath, conn, state->registers.impl_registers[CWND_REG]);
}
}

1
ok = state_machine(conn);// 进入状态机
1
2
3
4
5
6
7
8
9
//发送的消息格式
struct CreateMsg cr = {
.init_cwnd = conn->flow_info.init_cwnd,
.mss = conn->flow_info.mss,
.src_ip = conn->flow_info.src_ip,
.src_port = conn->flow_info.src_port,
.dst_ip = conn->flow_info.dst_ip,
.dst_port = conn->flow_info.dst_port,
};

4.4 进入machine.c ,分析state_machine函数。从connetciton中提取出state,从state中提取出program, 通过 process_expression()函数计算,将计算结果写入相关寄存器。 根据寄存器的结果,选择将相关的计算结果写入相应的地方。 即修改cwnd, rate_abs, 或者将测量结果通过函数 send_measurement()发送给ccp的userspace。

  1. 接下来分析一下kernel space 收到从user spcace中的消息时的行为:
    定位 ccp.c, 函数 ccp_read_msg(),可以发现回传的消息类型有三种,具体可见文件libcpp/serialize.c/read_header函数, 分别为INSTALL_EXPR、UPDATE_FIELDS、CHANGE_PROG三种类型。
    INSTALL_EXPR:INSTALL_EXPR message is for all flows, not a specific connection.
    安装program, 执行datapath_program_install函数。
    datapath_program_install函数: saves a new datapath program into the array of datapath programs; returns index into datapath program array where this program is stored; if there is no more space, returns -1;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    /* Callback to pass to IPC for incoming messages.
    * Cannot take ccp_connection as an argument, since it's a callback.
    * Therefore, must look up ccp_connction from socket_id.
    * buf: the received message, of size bufsize.
    */
    int ccp_read_msg(
    char *buf,
    int bufsize
    );

分析测量结果的得出与汇报,从下面代码段可以看出,测量结果存在 report registers中,通过检测 SHOULD_REPORT_REG 标志来决定是否发送测量结果给ccp user space.

1
2
3
4
if (state->registers.impl_registers[SHOULD_REPORT_REG]) {
send_measurement(conn, program->program_uid, state->registers.report_registers, program->num_to_return);
reset_state(state);
}

通过分析位于 machine.c 文件中的一下两个函数,可以发现这两个函数通过寄存器间的运算来进行相关测量工作,测量依据为program。

1
2
3
4
5
6
7
8
int process_expression(int expr_index, struct ccp_priv_state *state, struct ccp_primitives* primitives);
int process_instruction(int instr_index, struct ccp_priv_state *state, struct ccp_primitives* primitives)
//其他
struct Register {
u8 type;
int index;
u64 value;
};

Programe是通过user space下发过来的,前文已经分析了下发流程,给出program的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
/*  Entire datapath program
* a set of expressions (conditions)
* a set of instructions
*/
struct DatapathProgram {
u8 num_to_return;
u16 index; // index in array
u32 program_uid; // program uid assigned by CCP agent
u32 num_expressions;
u32 num_instructions;
struct Expression expressions[MAX_EXPRESSIONS];
struct Instruction64 fold_instructions[MAX_INSTRUCTIONS];
};

Portus(控制平面,用户态) 代码阅读

Portus 简介

Portus is an implementation of a congestion control plane (CCP). It is a library that can be used to write new congestion control algorithms in user-space. Congestion control algorithm implementations live in independent crates which use this library for common functionality. Each algorithm crate provides a binary which runs a CCP with that algorithm activated.
注:Portus已有相关文档可供参考

libccp 简介

Libccp is an implementation of the core functionality necsesary for a datapath to communicate with a CCP process. The datapath is responsible for providing a few callback functions for modifying state internal to the datapath (e.g. congestion window or packet pacing rate) and a few utility functions and libccp handles everything else. The instructions below detail all of the steps necessary to make a datapath CCP compatible.

Reno算法在User Space的实现

  1. 文件定位: /src/reno.rs 该文件定义了 Reno算法类, 包括 set_cwnd increase reduction等函数;
  2. /bin/src/reno.rs,该文件是Reno算法的入口,给出Reno算法的相关配置。调用公共运行接口运行reno算法。

    1
    ccp_generic_cong_avoid::start::<Reno>(ipc.as_str(), log, cfg);
  3. /src/bin_helper.rs, 分析start函数。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    pub fn start<T: GenericCongAvoidAlg>(ipc: &str, log: slog::Logger, cfg: GenericCongAvoidConfig)
    "netlink" => {
    use portus::ipc::netlink::Socket;
    let b = Socket::<Blocking>::new()
    .map(|sk| BackendBuilder {sock: sk})
    .expect("ipc initialization");
    portus::run::<_, GenericCongAvoid<_, T>>(
    b,
    &portus::Config {
    logger: Some(log),
    config: cfg,
    }
    ).unwrap();
    }
  4. /src/lib.rs 文件, 分析run函数。

    1
    2
    pub fn run<I, U>(backend_builder:         BackendBuilder<I>, cfg: &Config<I, U>) -> Result<!>
    fn run_inner<I, U>(backend_builder: BackendBuilder<I>, cfg: &Config<I, U>, continue_listening: Arc<atomic::AtomicBool>) -> Result<()>
  5. 定位 portus/src/ipc/netlink.rs

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
      use super::Blocking;
    impl super::Ipc for Socket<Blocking> {
    fn recv(&self, buf: &mut [u8]) -> Result<usize> {
    self.__recv(buf, nix::sys::socket::MsgFlags::empty())
    }

    fn send(&self, buf: &[u8]) -> Result<()> {
    self.__send(buf)
    }

    fn close(&self) -> Result<()> {
    self.__close()
    }
    }
  6. 文件定位 ccp_generic_cong_avoid/src/lib.rs,以下代码给出了初始化时datapath向用户空间汇报测量结果的方法

    1
    2
    3
    4
    impl<T: Ipc, A: GenericCongAvoidAlg> GenericCongAvoid<T, A> {
    fn install_datapath_interval(&self, interval: time::Duration) -> Scop{}
    fn install_datapath_interval_rtt(&self) -> Scope {}
    fn install_ack_update(&self) -> Scope {}

以下代码指明了用户空间收到测量结果时的反应:

1
2
impl<T: Ipc, A: GenericCongAvoidAlg> CongAlg<T> for GenericCongAvoid<T, A> {}
fn on_report(&mut self, _sock_id: u32, m: Report)

  1. 中间分析, 到目前为止,已基本能掌握整个系统的工作流程,但是对于每个小模块的具体流程还有待梳理清楚。下面将着重分析几个子模块的工作流程。

CCP系统是如何实现测量的?

即对于每一个测量元素(rtt,ack,loss等)是如何实现测量的,在ccp系统中,这些测量元素被称为primitives.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
* Primitive registers
*/
#define ACK_BYTES_ACKED 0
#define ACK_BYTES_MISORDERED 1
#define ACK_ECN_BYTES 2
#define ACK_ECN_PACKETS 3
#define ACK_LOST_PKTS_SAMPLE 4
#define ACK_NOW 5
#define ACK_PACKETS_ACKED 6
#define ACK_PACKETS_MISORDERED 7
#define FLOW_BYTES_IN_FLIGHT 8
#define FLOW_BYTES_PENDING 9
#define FLOW_PACKETS_IN_FLIGHT 10
#define FLOW_RATE_INCOMING 11
#define FLOW_RATE_OUTGOING 12
#define FLOW_RTT_SAMPLE_US 13
#define FLOW_WAS_TIMEOUT 14

定位函数 tcp_ccp.c 文件中的 load_primitives 函数。在这个函数中,给出了如何测量primitives, 其取决于重要的结构体 sock,需要详细分析。

1
int load_primitives(struct sock *sk, const struct rate_sample *rs);

上述函数的结构体来自于

1
void tcp_ccp_cong_control(struct sock *sk, const struct rate_sample *rs);

而上述函数来自于注册函数的接口

1
2
3
4
5
struct tcp_congestion_ops tcp_ccp_congestion_ops = {
//...
.cong_control = tcp_ccp_cong_control,//cong_control 提供给tcp_ccp_cong_contro 需要的参数
// ...
};

至此,我们知道了测量primitives时用到的关键数据结构来自于内核。